KeyedStream
IntervalJoin是在KeyedStream上使用的
public static class IntervalJoin<T1, T2, KEY> {
private final KeyedStream<T1, KEY> streamOne;
private final KeyedStream<T2, KEY> streamTwo;
IntervalJoin(KeyedStream<T1, KEY> streamOne, KeyedStream<T2, KEY> streamTwo) {
this.streamOne = (KeyedStream)Preconditions.checkNotNull(streamOne);
this.streamTwo = (KeyedStream)Preconditions.checkNotNull(streamTwo);
}
@PublicEvolving
public KeyedStream.IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) {
TimeCharacteristic timeCharacteristic = this.streamOne.getExecutionEnvironment().getStreamTimeCharacteristic();
if (timeCharacteristic != TimeCharacteristic.EventTime) {
throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
} else {
Preconditions.checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join");
Preconditions.checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join");
return new KeyedStream.IntervalJoined(this.streamOne, this.streamTwo, lowerBound.toMilliseconds(), upperBound.toMilliseconds(), true, true);
}
}
}
里面用到的IntervalJoined是这部分,可以看下最后一行,connect了两个流,然后把join字段进行keyBy,然后交给了IntervalJoinOperator
@PublicEvolving
public static class IntervalJoined<IN1, IN2, KEY> {
private final KeyedStream<IN1, KEY> left;
private final KeyedStream<IN2, KEY> right;
private final long lowerBound;
private final long upperBound;
private final KeySelector<IN1, KEY> keySelector1;
private final KeySelector<IN2, KEY> keySelector2;
private boolean lowerBoundInclusive;
private boolean upperBoundInclusive;
public IntervalJoined(KeyedStream<IN1, KEY> left, KeyedStream<IN2, KEY> right, long lowerBound, long upperBound, boolean lowerBoundInclusive, boolean upperBoundInclusive) {
this.left = (KeyedStream)Preconditions.checkNotNull(left);
this.right = (KeyedStream)Preconditions.checkNotNull(right);
this.lowerBound = lowerBound;
this.upperBound = upperBound;
this.lowerBoundInclusive = lowerBoundInclusive;
this.upperBoundInclusive = upperBoundInclusive;
this.keySelector1 = left.getKeySelector();
this.keySelector2 = right.getKeySelector();
}
@PublicEvolving
public KeyedStream.IntervalJoined<IN1, IN2, KEY> upperBoundExclusive() {
this.upperBoundInclusive = false;
return this;
}
@PublicEvolving
public KeyedStream.IntervalJoined<IN1, IN2, KEY> lowerBoundExclusive() {
this.lowerBoundInclusive = false;
return this;
}
@PublicEvolving
public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction) {
Preconditions.checkNotNull(processJoinFunction);
TypeInformation<OUT> outputType = TypeExtractor.getBinaryOperatorReturnType(processJoinFunction, ProcessJoinFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, this.left.getType(), this.right.getType(), Utils.getCallLocationName(), true);
return this.process(processJoinFunction, outputType);
}
@PublicEvolving
public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction, TypeInformation<OUT> outputType) {
Preconditions.checkNotNull(processJoinFunction);
Preconditions.checkNotNull(outputType);
ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = (ProcessJoinFunction)this.left.getExecutionEnvironment().clean(processJoinFunction);
IntervalJoinOperator<KEY, IN1, IN2, OUT> operator = new IntervalJoinOperator(this.lowerBound, this.upperBound, this.lowerBoundInclusive, this.upperBoundInclusive, this.left.getType().createSerializer(this.left.getExecutionConfig()), this.right.getType().createSerializer(this.right.getExecutionConfig()), cleanedUdf);
return this.left.connect(this.right).keyBy(this.keySelector1, this.keySelector2).transform("Interval Join", outputType, operator);
}
}
IntervalJoinOperator
- IntervalJoinOperator 继承了 ProcessJoinFunction,因为是双流,所以有processElement1,processElement2,不过都执行的processElement这个逻辑。
- processElement这里面有一个addToBuffer的方法,就是把自己这边的数据放入缓存,这个缓存是个MapState,key是时间戳,value是当前时间戳上的数据。
- Iterator var12 = otherBuffer.entries().iterator();这个读的是另外一个流的缓存,等缓存都读完的时候,就会注册一个定时器,这个定时器会在时间到达ourTimestamp + relativeUpperBound(
long cleanupTime = relativeUpperBound > 0L ? ourTimestamp + relativeUpperBound : ourTimestamp;
)把缓存清掉(见onEventTimer) - 那时间是从哪取的,可以看timer.getTimestamp()实现的接口,取的是两个流里较小的时间戳作为触发定时器的时间
PriorityComparator<InternalTimer<?, ?>> TIMER_COMPARATOR = (left, right) -> {
return Long.compare(left.getTimestamp(), right.getTimestamp());
};
- 后面有个var16,但是这个还是otherBuffer的内容,可以看到最后交给collect方法,collect方法就会调用processJoinFunction进行关联。另外也有时间戳的操作,resultTimestamp取的是两边最大的值,会在输出用到,作为关联后的时间戳
核心处理逻辑
processElement
public void processElement1(StreamRecord<T1> record) throws Exception {
this.processElement(record, this.leftBuffer, this.rightBuffer, this.lowerBound, this.upperBound, true);
}
public void processElement2(StreamRecord<T2> record) throws Exception {
this.processElement(record, this.rightBuffer, this.leftBuffer, -this.upperBound, -this.lowerBound, false);
}
private <THIS, OTHER> void processElement(StreamRecord<THIS> record, MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer, MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer, long relativeLowerBound, long relativeUpperBound, boolean isLeft) throws Exception {
THIS ourValue = record.getValue();
long ourTimestamp = record.getTimestamp();
if (ourTimestamp == -9223372036854775808L) {
throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in interval stream joins need to have timestamps meaningful timestamps.");
} else if (!this.isLate(ourTimestamp)) {
addToBuffer(ourBuffer, ourValue, ourTimestamp);
Iterator var12 = otherBuffer.entries().iterator();
while(true) {
Entry bucket;
long timestamp;
do {
do {
if (!var12.hasNext()) {
long cleanupTime = relativeUpperBound > 0L ? ourTimestamp + relativeUpperBound : ourTimestamp;
if (isLeft) {
this.internalTimerService.registerEventTimeTimer("CLEANUP_LEFT", cleanupTime);
} else {
this.internalTimerService.registerEventTimeTimer("CLEANUP_RIGHT", cleanupTime);
}
return;
}
bucket = (Entry)var12.next();
timestamp = (Long)bucket.getKey();
} while(timestamp < ourTimestamp + relativeLowerBound);
} while(timestamp > ourTimestamp + relativeUpperBound);
Iterator var16 = ((List)bucket.getValue()).iterator();
while(var16.hasNext()) {
IntervalJoinOperator.BufferEntry<OTHER> entry = (IntervalJoinOperator.BufferEntry)var16.next();
if (isLeft) {
this.collect(ourValue, entry.element, ourTimestamp, timestamp);
} else {
this.collect(entry.element, ourValue, timestamp, ourTimestamp);
}
}
}
}
}
缓存流数据到MapState里
addToBuffer
private static <T> void addToBuffer(MapState<Long, List<IntervalJoinOperator.BufferEntry<T>>> buffer, T value, long timestamp) throws Exception {
List<IntervalJoinOperator.BufferEntry<T>> elemsInBucket = (List)buffer.get(timestamp);
if (elemsInBucket == null) {
elemsInBucket = new ArrayList();
}
((List)elemsInBucket).add(new IntervalJoinOperator.BufferEntry(value, false));
buffer.put(timestamp, elemsInBucket);
}
设置定时器清空缓存
onEventTime
public void onEventTime(InternalTimer<K, String> timer) throws Exception {
long timerTimestamp = timer.getTimestamp();
String namespace = (String)timer.getNamespace();
logger.trace("onEventTime @ {}", timerTimestamp);
byte var6 = -1;
switch(namespace.hashCode()) {
case -1230758783:
if (namespace.equals("CLEANUP_RIGHT")) {
var6 = 1;
}
break;
case 2038325474:
if (namespace.equals("CLEANUP_LEFT")) {
var6 = 0;
}
}
long timestamp;
switch(var6) {
case 0:
timestamp = this.upperBound <= 0L ? timerTimestamp : timerTimestamp - this.upperBound;
logger.trace("Removing from left buffer @ {}", timestamp);
this.leftBuffer.remove(timestamp);
break;
case 1:
timestamp = this.lowerBound <= 0L ? timerTimestamp + this.lowerBound : timerTimestamp;
logger.trace("Removing from right buffer @ {}", timestamp);
this.rightBuffer.remove(timestamp);
break;
default:
throw new RuntimeException("Invalid namespace " + namespace);
}
}
交给真正的joinFunction做join
collect
private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
this.collector.setAbsoluteTimestamp(resultTimestamp);
this.context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);
((ProcessJoinFunction)this.userFunction).processElement(left, right, this.context, this.collector);
}
output
public <X> void output(OutputTag<X> outputTag, X value) {
Preconditions.checkArgument(outputTag != null, "OutputTag must not be null");
IntervalJoinOperator.this.output.collect(outputTag, new StreamRecord(value, this.getTimestamp()));
}
网友评论