美文网首页
Flink IntervalJoin

Flink IntervalJoin

作者: kaiker | 来源:发表于2021-10-27 12:20 被阅读0次

KeyedStream

IntervalJoin是在KeyedStream上使用的

public static class IntervalJoin<T1, T2, KEY> {
        private final KeyedStream<T1, KEY> streamOne;
        private final KeyedStream<T2, KEY> streamTwo;

        IntervalJoin(KeyedStream<T1, KEY> streamOne, KeyedStream<T2, KEY> streamTwo) {
            this.streamOne = (KeyedStream)Preconditions.checkNotNull(streamOne);
            this.streamTwo = (KeyedStream)Preconditions.checkNotNull(streamTwo);
        }

        @PublicEvolving
        public KeyedStream.IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) {
            TimeCharacteristic timeCharacteristic = this.streamOne.getExecutionEnvironment().getStreamTimeCharacteristic();
            if (timeCharacteristic != TimeCharacteristic.EventTime) {
                throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
            } else {
                Preconditions.checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join");
                Preconditions.checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join");
                return new KeyedStream.IntervalJoined(this.streamOne, this.streamTwo, lowerBound.toMilliseconds(), upperBound.toMilliseconds(), true, true);
            }
        }
    }

里面用到的IntervalJoined是这部分,可以看下最后一行,connect了两个流,然后把join字段进行keyBy,然后交给了IntervalJoinOperator

@PublicEvolving
    public static class IntervalJoined<IN1, IN2, KEY> {
        private final KeyedStream<IN1, KEY> left;
        private final KeyedStream<IN2, KEY> right;
        private final long lowerBound;
        private final long upperBound;
        private final KeySelector<IN1, KEY> keySelector1;
        private final KeySelector<IN2, KEY> keySelector2;
        private boolean lowerBoundInclusive;
        private boolean upperBoundInclusive;

        public IntervalJoined(KeyedStream<IN1, KEY> left, KeyedStream<IN2, KEY> right, long lowerBound, long upperBound, boolean lowerBoundInclusive, boolean upperBoundInclusive) {
            this.left = (KeyedStream)Preconditions.checkNotNull(left);
            this.right = (KeyedStream)Preconditions.checkNotNull(right);
            this.lowerBound = lowerBound;
            this.upperBound = upperBound;
            this.lowerBoundInclusive = lowerBoundInclusive;
            this.upperBoundInclusive = upperBoundInclusive;
            this.keySelector1 = left.getKeySelector();
            this.keySelector2 = right.getKeySelector();
        }

        @PublicEvolving
        public KeyedStream.IntervalJoined<IN1, IN2, KEY> upperBoundExclusive() {
            this.upperBoundInclusive = false;
            return this;
        }

        @PublicEvolving
        public KeyedStream.IntervalJoined<IN1, IN2, KEY> lowerBoundExclusive() {
            this.lowerBoundInclusive = false;
            return this;
        }

        @PublicEvolving
        public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction) {
            Preconditions.checkNotNull(processJoinFunction);
            TypeInformation<OUT> outputType = TypeExtractor.getBinaryOperatorReturnType(processJoinFunction, ProcessJoinFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, this.left.getType(), this.right.getType(), Utils.getCallLocationName(), true);
            return this.process(processJoinFunction, outputType);
        }

        @PublicEvolving
        public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction, TypeInformation<OUT> outputType) {
            Preconditions.checkNotNull(processJoinFunction);
            Preconditions.checkNotNull(outputType);
            ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = (ProcessJoinFunction)this.left.getExecutionEnvironment().clean(processJoinFunction);
            IntervalJoinOperator<KEY, IN1, IN2, OUT> operator = new IntervalJoinOperator(this.lowerBound, this.upperBound, this.lowerBoundInclusive, this.upperBoundInclusive, this.left.getType().createSerializer(this.left.getExecutionConfig()), this.right.getType().createSerializer(this.right.getExecutionConfig()), cleanedUdf);
            return this.left.connect(this.right).keyBy(this.keySelector1, this.keySelector2).transform("Interval Join", outputType, operator);
        }
    }

IntervalJoinOperator

  • IntervalJoinOperator 继承了 ProcessJoinFunction,因为是双流,所以有processElement1,processElement2,不过都执行的processElement这个逻辑。
  • processElement这里面有一个addToBuffer的方法,就是把自己这边的数据放入缓存,这个缓存是个MapState,key是时间戳,value是当前时间戳上的数据。
  • Iterator var12 = otherBuffer.entries().iterator();这个读的是另外一个流的缓存,等缓存都读完的时候,就会注册一个定时器,这个定时器会在时间到达ourTimestamp + relativeUpperBound(long cleanupTime = relativeUpperBound > 0L ? ourTimestamp + relativeUpperBound : ourTimestamp;把缓存清掉(见onEventTimer)
  • 那时间是从哪取的,可以看timer.getTimestamp()实现的接口,取的是两个流里较小的时间戳作为触发定时器的时间
    PriorityComparator<InternalTimer<?, ?>> TIMER_COMPARATOR = (left, right) -> {
        return Long.compare(left.getTimestamp(), right.getTimestamp());
    };
  • 后面有个var16,但是这个还是otherBuffer的内容,可以看到最后交给collect方法,collect方法就会调用processJoinFunction进行关联。另外也有时间戳的操作,resultTimestamp取的是两边最大的值,会在输出用到,作为关联后的时间戳

核心处理逻辑

processElement

    public void processElement1(StreamRecord<T1> record) throws Exception {
        this.processElement(record, this.leftBuffer, this.rightBuffer, this.lowerBound, this.upperBound, true);
    }

    public void processElement2(StreamRecord<T2> record) throws Exception {
        this.processElement(record, this.rightBuffer, this.leftBuffer, -this.upperBound, -this.lowerBound, false);
    }

    private <THIS, OTHER> void processElement(StreamRecord<THIS> record, MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer, MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer, long relativeLowerBound, long relativeUpperBound, boolean isLeft) throws Exception {
        THIS ourValue = record.getValue();
        long ourTimestamp = record.getTimestamp();
        if (ourTimestamp == -9223372036854775808L) {
            throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in interval stream joins need to have timestamps meaningful timestamps.");
        } else if (!this.isLate(ourTimestamp)) {
            addToBuffer(ourBuffer, ourValue, ourTimestamp);
            Iterator var12 = otherBuffer.entries().iterator();

            while(true) {
                Entry bucket;
                long timestamp;
                do {
                    do {
                        if (!var12.hasNext()) {
                            long cleanupTime = relativeUpperBound > 0L ? ourTimestamp + relativeUpperBound : ourTimestamp;
                            if (isLeft) {
                                this.internalTimerService.registerEventTimeTimer("CLEANUP_LEFT", cleanupTime);
                            } else {
                                this.internalTimerService.registerEventTimeTimer("CLEANUP_RIGHT", cleanupTime);
                            }

                            return;
                        }

                        bucket = (Entry)var12.next();
                        timestamp = (Long)bucket.getKey();
                    } while(timestamp < ourTimestamp + relativeLowerBound);
                } while(timestamp > ourTimestamp + relativeUpperBound);

                Iterator var16 = ((List)bucket.getValue()).iterator();

                while(var16.hasNext()) {
                    IntervalJoinOperator.BufferEntry<OTHER> entry = (IntervalJoinOperator.BufferEntry)var16.next();
                    if (isLeft) {
                        this.collect(ourValue, entry.element, ourTimestamp, timestamp);
                    } else {
                        this.collect(entry.element, ourValue, timestamp, ourTimestamp);
                    }
                }
            }
        }
    }

缓存流数据到MapState里

addToBuffer

    private static <T> void addToBuffer(MapState<Long, List<IntervalJoinOperator.BufferEntry<T>>> buffer, T value, long timestamp) throws Exception {
        List<IntervalJoinOperator.BufferEntry<T>> elemsInBucket = (List)buffer.get(timestamp);
        if (elemsInBucket == null) {
            elemsInBucket = new ArrayList();
        }

        ((List)elemsInBucket).add(new IntervalJoinOperator.BufferEntry(value, false));
        buffer.put(timestamp, elemsInBucket);
    }

设置定时器清空缓存

onEventTime

    public void onEventTime(InternalTimer<K, String> timer) throws Exception {
        long timerTimestamp = timer.getTimestamp();
        String namespace = (String)timer.getNamespace();
        logger.trace("onEventTime @ {}", timerTimestamp);
        byte var6 = -1;
        switch(namespace.hashCode()) {
        case -1230758783:
            if (namespace.equals("CLEANUP_RIGHT")) {
                var6 = 1;
            }
            break;
        case 2038325474:
            if (namespace.equals("CLEANUP_LEFT")) {
                var6 = 0;
            }
        }

        long timestamp;
        switch(var6) {
        case 0:
            timestamp = this.upperBound <= 0L ? timerTimestamp : timerTimestamp - this.upperBound;
            logger.trace("Removing from left buffer @ {}", timestamp);
            this.leftBuffer.remove(timestamp);
            break;
        case 1:
            timestamp = this.lowerBound <= 0L ? timerTimestamp + this.lowerBound : timerTimestamp;
            logger.trace("Removing from right buffer @ {}", timestamp);
            this.rightBuffer.remove(timestamp);
            break;
        default:
            throw new RuntimeException("Invalid namespace " + namespace);
        }

    }

交给真正的joinFunction做join

collect

    private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
        long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
        this.collector.setAbsoluteTimestamp(resultTimestamp);
        this.context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);
        ((ProcessJoinFunction)this.userFunction).processElement(left, right, this.context, this.collector);
    }

output

      public <X> void output(OutputTag<X> outputTag, X value) {
            Preconditions.checkArgument(outputTag != null, "OutputTag must not be null");
            IntervalJoinOperator.this.output.collect(outputTag, new StreamRecord(value, this.getTimestamp()));
        }

相关文章

网友评论

      本文标题:Flink IntervalJoin

      本文链接:https://www.haomeiwen.com/subject/fjaualtx.html