JoinedStreams
源码里runWindowJoin的example
public static DataStream<Tuple3<String, Integer, Integer>> runWindowJoin(
DataStream<Tuple2<String, Integer>> grades,
DataStream<Tuple2<String, Integer>> salaries,
long windowSize) {
return grades.join(salaries)
.where(new NameKeySelector())
.equalTo(new NameKeySelector())
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
.apply(
new JoinFunction<
Tuple2<String, Integer>,
Tuple2<String, Integer>,
Tuple3<String, Integer, Integer>>() {
@Override
public Tuple3<String, Integer, Integer> join(
Tuple2<String, Integer> first, Tuple2<String, Integer> second) {
return new Tuple3<String, Integer, Integer>(
first.f0, first.f1, second.f1);
}
});
}
- DataStream直接调用.join(),返回JoinedStreams
- 最后用户提供的join方法是在.apply里面传了
public <T> DataStream<T> apply(
JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
// clean the closure
function = input1.getExecutionEnvironment().clean(function);
coGroupedWindowedStream =
input1.coGroup(input2)
.where(keySelector1)
.equalTo(keySelector2)
.window(windowAssigner)
.trigger(trigger)
.evictor(evictor)
.allowedLateness(allowedLateness);
return coGroupedWindowedStream.apply(new JoinCoGroupFunction<>(function), resultType);
}
- 在coGroupedWindowedStream的apply里能看到,两个流是union了下,然后是交给WindowStream的apply
// coGroupedWindowedStream#apply
public <T> DataStream<T> apply(
CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
// clean the closure
function = input1.getExecutionEnvironment().clean(function);
UnionTypeInfo<T1, T2> unionType =
new UnionTypeInfo<>(input1.getType(), input2.getType());
UnionKeySelector<T1, T2, KEY> unionKeySelector =
new UnionKeySelector<>(keySelector1, keySelector2);
DataStream<TaggedUnion<T1, T2>> taggedInput1 =
input1.map(new Input1Tagger<T1, T2>())
.setParallelism(input1.getParallelism())
.returns(unionType);
DataStream<TaggedUnion<T1, T2>> taggedInput2 =
input2.map(new Input2Tagger<T1, T2>())
.setParallelism(input2.getParallelism())
.returns(unionType);
DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
// we explicitly create the keyed stream to manually pass the key type information in
windowedStream =
new KeyedStream<TaggedUnion<T1, T2>, KEY>(
unionStream, unionKeySelector, keyType)
.window(windowAssigner);
if (trigger != null) {
windowedStream.trigger(trigger);
}
if (evictor != null) {
windowedStream.evictor(evictor);
}
if (allowedLateness != null) {
windowedStream.allowedLateness(allowedLateness);
}
return windowedStream.apply(
new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
}
// WindowStream#apply
public <R> SingleOutputStreamOperator<R> apply(
WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
function = input.getExecutionEnvironment().clean(function);
final String opName = builder.generateOperatorName();
final String opDescription = builder.generateOperatorDescription(function, null);
OneInputStreamOperator<T, R> operator = builder.apply(function);
return input.transform(opName, resultType, operator).setDescription(opDescription);
}
- joinFunction,
JoinCoGroupFunction<>(function)
包的用户的join实现。
private static class JoinCoGroupFunction<T1, T2, T>
extends WrappingFunction<JoinFunction<T1, T2, T>>
implements CoGroupFunction<T1, T2, T> {
private static final long serialVersionUID = 1L;
public JoinCoGroupFunction(JoinFunction<T1, T2, T> wrappedFunction) {
super(wrappedFunction);
}
@Override
public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out)
throws Exception {
for (T1 val1 : first) {
for (T2 val2 : second) {
out.collect(wrappedFunction.join(val1, val2));
}
}
}
}
网友评论