美文网首页
Flink WindowJoin

Flink WindowJoin

作者: kaiker | 来源:发表于2022-03-30 19:25 被阅读0次

    JoinedStreams

    源码里runWindowJoin的example

        public static DataStream<Tuple3<String, Integer, Integer>> runWindowJoin(
                DataStream<Tuple2<String, Integer>> grades,
                DataStream<Tuple2<String, Integer>> salaries,
                long windowSize) {
    
            return grades.join(salaries)
                    .where(new NameKeySelector())
                    .equalTo(new NameKeySelector())
                    .window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
                    .apply(
                            new JoinFunction<
                                    Tuple2<String, Integer>,
                                    Tuple2<String, Integer>,
                                    Tuple3<String, Integer, Integer>>() {
    
                                @Override
                                public Tuple3<String, Integer, Integer> join(
                                        Tuple2<String, Integer> first, Tuple2<String, Integer> second) {
                                    return new Tuple3<String, Integer, Integer>(
                                            first.f0, first.f1, second.f1);
                                }
                            });
        }
    
    • DataStream直接调用.join(),返回JoinedStreams
    • 最后用户提供的join方法是在.apply里面传了
            public <T> DataStream<T> apply(
                    JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
                // clean the closure
                function = input1.getExecutionEnvironment().clean(function);
    
                coGroupedWindowedStream =
                        input1.coGroup(input2)
                                .where(keySelector1)
                                .equalTo(keySelector2)
                                .window(windowAssigner)
                                .trigger(trigger)
                                .evictor(evictor)
                                .allowedLateness(allowedLateness);
    
                return coGroupedWindowedStream.apply(new JoinCoGroupFunction<>(function), resultType);
            }
    
    • 在coGroupedWindowedStream的apply里能看到,两个流是union了下,然后是交给WindowStream的apply
    // coGroupedWindowedStream#apply
            public <T> DataStream<T> apply(
                    CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
                // clean the closure
                function = input1.getExecutionEnvironment().clean(function);
    
                UnionTypeInfo<T1, T2> unionType =
                        new UnionTypeInfo<>(input1.getType(), input2.getType());
                UnionKeySelector<T1, T2, KEY> unionKeySelector =
                        new UnionKeySelector<>(keySelector1, keySelector2);
    
                DataStream<TaggedUnion<T1, T2>> taggedInput1 =
                        input1.map(new Input1Tagger<T1, T2>())
                                .setParallelism(input1.getParallelism())
                                .returns(unionType);
                DataStream<TaggedUnion<T1, T2>> taggedInput2 =
                        input2.map(new Input2Tagger<T1, T2>())
                                .setParallelism(input2.getParallelism())
                                .returns(unionType);
    
                DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
    
                // we explicitly create the keyed stream to manually pass the key type information in
                windowedStream =
                        new KeyedStream<TaggedUnion<T1, T2>, KEY>(
                                        unionStream, unionKeySelector, keyType)
                                .window(windowAssigner);
    
                if (trigger != null) {
                    windowedStream.trigger(trigger);
                }
                if (evictor != null) {
                    windowedStream.evictor(evictor);
                }
                if (allowedLateness != null) {
                    windowedStream.allowedLateness(allowedLateness);
                }
    
                return windowedStream.apply(
                        new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
            }
    
    // WindowStream#apply
        public <R> SingleOutputStreamOperator<R> apply(
                WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
            function = input.getExecutionEnvironment().clean(function);
    
            final String opName = builder.generateOperatorName();
            final String opDescription = builder.generateOperatorDescription(function, null);
            OneInputStreamOperator<T, R> operator = builder.apply(function);
    
            return input.transform(opName, resultType, operator).setDescription(opDescription);
        }
    
    • joinFunction,JoinCoGroupFunction<>(function)包的用户的join实现。
        private static class JoinCoGroupFunction<T1, T2, T>
                extends WrappingFunction<JoinFunction<T1, T2, T>>
                implements CoGroupFunction<T1, T2, T> {
            private static final long serialVersionUID = 1L;
    
            public JoinCoGroupFunction(JoinFunction<T1, T2, T> wrappedFunction) {
                super(wrappedFunction);
            }
    
            @Override
            public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out)
                    throws Exception {
                for (T1 val1 : first) {
                    for (T2 val2 : second) {
                        out.collect(wrappedFunction.join(val1, val2));
                    }
                }
            }
        }
    

    回撤是如何实现的

    https://blog.csdn.net/a1240466196/article/details/109975823

    IntervalJoin

    https://www.jianshu.com/p/b807c5ffee48

    相关文章

      网友评论

          本文标题:Flink WindowJoin

          本文链接:https://www.haomeiwen.com/subject/owbijrtx.html