StreamGraph是什么?
StreamGraph顾名思义是流图,它描述的是一个数据流的拓补结构,包含了创建JobGraph的所有必要信息。StreamGraph由StreamNode和StreamEdge组成,StreamNode描述了流程序中的一个操作符和相关的属性,StreamEdge是连接两个StreamNode的边,代表的是一个数据流。
为什么需要StreamGraph?
Flink是一个流式计算引擎,它支持两种类型的执行模式:流式计算和批处理。但是在运行时,这两种模式是被统一,并没有区别对待,都是使用JobGraph进行描述。为了描述这两种执行模式,分别定义了不同的数据结构,那就是StreamGraph和Plan,基于这两种数据结构生成相应的JobGraph。
如何创建StreamGraph?
在介绍如何创建StreamGraph之前,我们先介绍一些跟StreamGraph相关的和新概念和数据结构。
StreamExecutionEnvironment
流的执行环境,包含流的ExecutionConfig,CheckpointConfig,StreamTransformation列表和StateBackend等。它是生成StreamGraph的入口:
public StreamGraph getStreamGraph() {
if (transformations.size() <= 0) {
throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
}
return StreamGraphGenerator.generate(this, transformations);
}
StreamGraphGenerator
顾名思义,StreamGraph生成器,真正执行和生成StreamGraph的类。它根据创建流拓补结构过程中生成的StreamTransformation,解析生成相应StreamNode和StreamEdge,组合生成StreamGraph。
private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
for (StreamTransformation<?> transformation: transformations) {
transform(transformation);
}
return streamGraph;
}
StreamTransformation
StreamTransformation代表了一个创建DataStream的操作,每个DataStream都有一个底层的StreamTransformation。但是并不是每一个StreamTransformation都对应一个运行时的物理操作。例如:
上图左边对应的是创建流拓补结构是对应的流图,而右边则是运行时的操作图。
StreamTransformation是一个抽象类型,具体对应的实现由很多种:
- SourceTransformation
- OneInputTransformation
- SideOutputTransformation
- SelectTransformation
- PartitionTransformation
- CoFeedbackTransformation
- FeedbackTransformation
- UnionTransformation
- SplitTransformation
- TwoInputTransformation
- SinkTransformation
每种类型的StreamTransformation都对应相应的operator,通过解析具体的StreamTransformation,创建StreamNode和StreamEdge。当然前面已经说过,并不是每种类型的StreamTransformation都会创建StreamNode和StreamEdge,下面看下具体的生成过程:
private Collection<Integer> transform(StreamTransformation<?> transform) {
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
LOG.debug("Transforming " + transform);
if (transform.getMaxParallelism() <= 0) {
// if the max parallelism hasn't been set, then first use the job wide max parallelism
// from theExecutionConfig.
int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
if (globalMaxParallelismFromConfig > 0) {
transform.setMaxParallelism(globalMaxParallelismFromConfig);
}
}
// call at least once to trigger exceptions about MissingTypeInfo
transform.getOutputType();
Collection<Integer> transformedIds;
if (transform instanceof OneInputTransformation<?, ?>) {
transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof SourceTransformation<?>) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof SplitTransformation<?>) {
transformedIds = transformSplit((SplitTransformation<?>) transform);
} else if (transform instanceof SelectTransformation<?>) {
transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else if (transform instanceof SideOutputTransformation<?>) {
transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}
// need this check because the iterate transformation adds itself before
// transforming the feedback edges
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}
if (transform.getBufferTimeout() > 0) {
streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
}
if (transform.getUid() != null) {
streamGraph.setTransformationUID(transform.getId(), transform.getUid());
}
if (transform.getUserProvidedNodeHash() != null) {
streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
}
if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
}
return transformedIds;
}
以OneInputTransformation为例,看下是如何创建StreamNode和StreamEdge。
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
Collection<Integer> inputIds = transform(transform.getInput());
// the recursive call might have already transformed this
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getOperator(),
transform.getInputType(),
transform.getOutputType(),
transform.getName());
if (transform.getStateKeySelector() != null) {
TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
}
streamGraph.setParallelism(transform.getId(), transform.getParallelism());
streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
for (Integer inputId: inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
}
return Collections.singleton(transform.getId());
}
第一步:解析OneInputTransformation的输入源(也是一个StreamTransformation,其实这是一个递归的过程,从最后一个StreamTransformation一直解析到第一个StreamTransformation),得到输入源的节点id。
Collection<Integer> inputIds = transform(transform.getInput());
第二步:创建StreamNode,并添加到StreamGraph中。
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getOperator(),
transform.getInputType(),
transform.getOutputType(),
transform.getName());
第三步:往StreamGraph中添加关联输入源和节点的边。
for (Integer inputId: inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
}
当遍历完所有的StreamTransformation,则相应的StreamGraph也已经生成。
网友评论