join

作者: ZYvette | 来源:发表于2021-05-07 20:43 被阅读0次

    join 操作

    window join 方式

    代码形式

    stream1.join(stream2)
        .where(<stream1KeySelector>)
        .equalTo(<stream2KeySelector>)
        .window(<WindowAssigner>)
        .apply(<JoinFunction>)
    

    相当于sql中的
    stream1 join stream2 where stream1.key=stream2.key

    window join方式:

    以下三种方式:

    Tumbling Window Join Sliding Window Join Session Window Join

    源码解析

    image.png

    ?

    windowjoin

    public <T> DataStream<T> apply(
                    JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
                // clean the closure
                function = input1.getExecutionEnvironment().clean(function);
    
                coGroupedWindowedStream =
                        input1.coGroup(input2)
                                .where(keySelector1)
                                .equalTo(keySelector2)
                                .window(windowAssigner)
                                .trigger(trigger)
                                .evictor(evictor)
                                .allowedLateness(allowedLateness);
    
                return coGroupedWindowedStream.apply(new JoinCoGroupFunction<>(function), resultType);
            }
    

    cogroup

    CoGroup 表示联合分组,将两个不同的DataStream联合起来,在相同的窗口内按照相同的key分组处理

            public <T> DataStream<T> apply(
                    CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
                // clean the closure
                function = input1.getExecutionEnvironment().clean(function);
    
                UnionTypeInfo<T1, T2> unionType =
                        new UnionTypeInfo<>(input1.getType(), input2.getType());
    // 联合keyselector,不同流选择对应的selector
                UnionKeySelector<T1, T2, KEY> unionKeySelector =
                        new UnionKeySelector<>(keySelector1, keySelector2);
    
                DataStream<TaggedUnion<T1, T2>> taggedInput1 =
                        input1.map(new Input1Tagger<T1, T2>())
                                .setParallelism(input1.getParallelism())
                                .returns(unionType);
                DataStream<TaggedUnion<T1, T2>> taggedInput2 =
                        input2.map(new Input2Tagger<T1, T2>())
                                .setParallelism(input2.getParallelism())
                                .returns(unionType);
    
                DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
    
                // we explicitly create the keyed stream to manually pass the key type information in
                windowedStream =
                        new KeyedStream<TaggedUnion<T1, T2>, KEY>(
                                        unionStream, unionKeySelector, keyType)
                                .window(windowAssigner);
    
                if (trigger != null) {
                    windowedStream.trigger(trigger);
                }
                if (evictor != null) {
                    windowedStream.evictor(evictor);
                }
                if (allowedLateness != null) {
                    windowedStream.allowedLateness(allowedLateness);
                }
    
                return windowedStream.apply(
                        new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
            }
    
    UnionKeySelector 类:
    public KEY getKey(TaggedUnion<T1, T2> value) throws Exception {
                if (value.isOne()) {
                    return keySelector1.getKey(value.getOne());
                } else {
                    return keySelector2.getKey(value.getTwo());
                }
            }
    
    • 对两个DataStream打标签进行区分,得到TaggedUnion,TaggedUnion包含one、two两个属性,分别对应两个流
    • 将两个打标签后的流TaggedUnion 进行union操作合并为一个DataStream类型流unionStream
    • unionStream根据不同的流选择对应where/equalTo条件进行keyBy 得到KeyedStream流
    • 通过指定的window方式得到一个WindowedStream,然后apply一个被CoGroupWindowFunction包装之后的function,后续就是window的操作

    intervaljoin

    image.png

    orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

    代码形式

    .keyBy(<KeySelector>)
        .intervalJoin(greenStream.keyBy(<KeySelector>))
        .between(Time.milliseconds(-2), Time.milliseconds(1))
        .process(< ProcessJoinFunction >)
    

    源码实现

    @PublicEvolving
            public <OUT> SingleOutputStreamOperator<OUT> process(
                    ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
                    TypeInformation<OUT> outputType) {
                Preconditions.checkNotNull(processJoinFunction);
                Preconditions.checkNotNull(outputType);
    
                final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf =
                        left.getExecutionEnvironment().clean(processJoinFunction);
    
                final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
                        new IntervalJoinOperator<>(
                                lowerBound,
                                upperBound,
                                lowerBoundInclusive,
                                upperBoundInclusive,
                                left.getType().createSerializer(left.getExecutionConfig()),
                                right.getType().createSerializer(right.getExecutionConfig()),
                                cleanedUdf);
    
                return left.connect(right)
                        .keyBy(keySelector1, keySelector2)
                        .transform("Interval Join", outputType, operator);
            }
    

    根据代码可以看出,两双流connect,然后根据interjoinoperator,分别存两个state用于存储数据,判断在窗口内则输出结果。

    IntervalJoinOperator

    • initializeState: 初始化state,包括leftbuffer,rightbuffer
      buffer类型:MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>>
      代表<timestamp,List<Bufferentry>>
    • 实际计算
    @Override    
    public void processElement1(StreamRecord<T1> record) throws Exception {
    //left数据存储leftbuffer
            processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
        }
    
    @Override
        public void processElement2(StreamRecord<T2> record) throws Exception {
    //right数据存入rightbuffer
            processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
        }
    
        private <THIS, OTHER> void processElement(
                final StreamRecord<THIS> record,
                final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
                final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
                final long relativeLowerBound,
                final long relativeUpperBound,
                final boolean isLeft)
                throws Exception {
    
            final THIS ourValue = record.getValue();
            final long ourTimestamp = record.getTimestamp();
    
            if (ourTimestamp == Long.MIN_VALUE) {
                throw new FlinkException(
                        "Long.MIN_VALUE timestamp: Elements used in "
                                + "interval stream joins need to have timestamps meaningful timestamps.");
            }
    
            if (isLate(ourTimestamp)) {
                return;
            }
          //数据存入state
            addToBuffer(ourBuffer, ourValue, ourTimestamp);
    
            for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket : otherBuffer.entries()) {
                final long timestamp = bucket.getKey();
    
                if (timestamp < ourTimestamp + relativeLowerBound
                        || timestamp > ourTimestamp + relativeUpperBound) {
                    continue;
                }
    //没来一条,计算一下之前所属的窗口对应数据计算。
                for (BufferEntry<OTHER> entry : bucket.getValue()) {
                    if (isLeft) {
                        collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
                    } else {
                        collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
                    }
                }
            }
    
            long cleanupTime =
                    (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
    //过期清除state里的数据,只有eventtime
            if (isLeft) {
          internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
            } else {
                internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
            }
        }
    
    
    //数据缓存到buffer
        private static <T> void addToBuffer(
                final MapState<Long, List<IntervalJoinOperator.BufferEntry<T>>> buffer,
                final T value,
                final long timestamp)
                throws Exception {
            List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
            if (elemsInBucket == null) {
                elemsInBucket = new ArrayList<>();
            }
            elemsInBucket.add(new BufferEntry<>(value, false));
            buffer.put(timestamp, elemsInBucket);
        }
    

    备注:只支持eventtime.
    b.数据每次来一条,计算一次对应窗口的内的数据。

    相关文章

      网友评论

          本文标题:join

          本文链接:https://www.haomeiwen.com/subject/ebxolltx.html