Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
什么是StreamGraph
StreamGraph是Flink任务执行流程拓扑图的封装。在Flink的client端,Environment执行execute()方法的时候,用户编写的数据处理流程会转变为StreamGraph。
各个算子最终会变成什么
Flink流处理的各个算子会被当做一系列transformation储存起来。具体请参见Flink 源码之基本算子。
下面以DataStream的Map方法为例说明。
DataStream的map方法:
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
// 获取mapper函数的返回类型
TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
Utils.getCallLocationName(), true);
// 对map操作封装为transformation,并返回SingleOutputStreamOperator
return transform("Map", outType, new StreamMap<>(clean(mapper)));
}
transform方法的代码:
public <R> SingleOutputStreamOperator<R> transform(
String operatorName,
TypeInformation<R> outTypeInfo,
OneInputStreamOperator<T, R> operator) {
// 这里将operator封装入operator factory
return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}
doTransform方法:
private <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
// 如果transformation的输出类型为MissingTypeInfo的话,程序会抛异常
transformation.getOutputType();
// 构造新的transformation
// map类型的transformation只有一个输入,因此它输入OneInputTransformation
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());
// 构造返回的stream,供后续的算子链式调用
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
// 将transformation写入ExecutionEnvironment中
// ExecutionEnvironment维护了一个叫做transformations的ArrayList对象,用于储存所有的transformation
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
执行到此,map算子已被封装为transformation,存储到了ExecutionEnvironment中。
几种transformation的类型
OneInputTransformation
顾名思义OneInputTransformation只有一个输入。代表的算子(单数据流)为:map,flatMap,fliter,process,assignTimestamps等。
TwoInputTransformation
TwoInputTransformation具有两个输入。ConnectedStream的算子为双流运算,它的算子会被转换为TwoInputTransformation。
SourceTransformation
在env中配置数据源的时候会创建出一个DataStreamSource。该对象为dataStream的源头。DataStreamSource的构造函数中会创建一个SourceTransformation。
SinkTransformation
和SourceTransformation类似,在dataStream调用addSink方法的时候会生成一个DataStreamSink对象。该对象在创建的时候会同时构造一个SinkTransformation。
UnionTransformation
该transformation为合并多个input到一个流中。代表算子为union。
SplitTransformation
DataStream调用split的时候会创建SplitStream。SplitStream初始化时会构建一个SplitTransformation。
SelectTransformation
SplitStream在调用select算子的时候会创建SelectTransformation。
FeedbackTransformation
创建IterativeStream的时候会使用到该transformation。
CoFeedbackTransformation
和FeedbackTransformation类似,创建ConnectedIterativeStream的时候会使用到。
PartitionTransformation
涉及到控制数据流向的算子都属于PartitionTransformation,例如shuffle,forward,rebalance,broadcast,rescale,global,partitionCustom和keyBy等。
SideOutputTransformation
调用getSideOutput(获取旁路输出)的时候,SideOutputTransformation会发生作用。
根据Transformation构建Stream Graph
我们从StreamExecutionEnvironment的execute方法开始分析stream graph的生成过程。
StreamExecutionEnvironment的execute方法源码:
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
return execute(getStreamGraph(jobName));
}
该方法中调用了getStreamGraph
方法。找到这个方法,如下所示:
public StreamGraph getStreamGraph(String jobName) {
// 创建一个StreamGraphGenerator对象,设置参数,并调用generate方法生成stream graph
return getStreamGraphGenerator().setJobName(jobName).generate();
}
private StreamGraphGenerator getStreamGraphGenerator() {
if (transformations.size() <= 0) {
throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
}
// 此处向StreamGraphGenerator传入transformations,以及其他的配置
return new StreamGraphGenerator(transformations, config, checkpointCfg)
.setStateBackend(defaultStateBackend)
.setChaining(isChainingEnabled)
.setUserArtifacts(cacheFile)
.setTimeCharacteristic(timeCharacteristic)
.setDefaultBufferTimeout(bufferTimeout);
}
接着跟踪到StreamGraphGenerator
的generate
方法,如下所示:
public StreamGraph generate() {
// 生成StreamGraph对象,传入执行配置和检查点配置
streamGraph = new StreamGraph(executionConfig, checkpointConfig);
// 设置状态后端
streamGraph.setStateBackend(stateBackend);
// 设置级联配置,为一项优化配置
streamGraph.setChaining(chaining);
// 设置调度方式,决定task延迟调度还是立刻调度
streamGraph.setScheduleMode(scheduleMode);
// StreamExecutionEnvironment的cacheFile会传入该变量
// cacheFile为需要分发到各个task manager的用户文件
streamGraph.setUserArtifacts(userArtifacts);
// 设置时间特征,是event time,processing time还是ingestion time
streamGraph.setTimeCharacteristic(timeCharacteristic);
// 设置作业名称
streamGraph.setJobName(jobName);
// 设置各个级联之间是否采用blocking 连接
streamGraph.setBlockingConnectionsBetweenChains(blockingConnectionsBetweenChains);
// 储存已经被处理的transformation
alreadyTransformed = new HashMap<>();
// 逐个处理transformation
for (Transformation<?> transformation: transformations) {
transform(transformation);
}
// 获取已生成的streamGraph
final StreamGraph builtStreamGraph = streamGraph;
// 清空中间变量
alreadyTransformed.clear();
alreadyTransformed = null;
streamGraph = null;
return builtStreamGraph;
}
这样看来,重点就在transform(transformation);
这一行代码了。
transform方法:
private Collection<Integer> transform(Transformation<?> transform) {
// 检查该transformation是否已被处理,如果已处理直接返回
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
LOG.debug("Transforming " + transform);
// 如果transformation的最大并行度没有设置,全局的最大并行度已设置,将全局最大并行度设置给transformation
if (transform.getMaxParallelism() <= 0) {
// if the max parallelism hasn't been set, then first use the job wide max parallelism
// from the ExecutionConfig.
int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
if (globalMaxParallelismFromConfig > 0) {
transform.setMaxParallelism(globalMaxParallelismFromConfig);
}
}
// call at least once to trigger exceptions about MissingTypeInfo
// 检查transformation的输出类型,如果是MissingTypeInfo则程序抛出异常
transform.getOutputType();
Collection<Integer> transformedIds;
// 依照transformation的具体类型,提供不同的处理方法
if (transform instanceof OneInputTransformation<?, ?>) {
transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof SourceTransformation<?>) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof SplitTransformation<?>) {
transformedIds = transformSplit((SplitTransformation<?>) transform);
} else if (transform instanceof SelectTransformation<?>) {
transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else if (transform instanceof SideOutputTransformation<?>) {
transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}
// need this check because the iterate transformation adds itself before
// transforming the feedback edges
// 如果该transformation没有被处理,则加入已处理列表
// 处理每个transformation的时候会先处理它的input(可能没有input,也可能有一个或多个),transform方法会递归调用。
// 在transform方法执行前后双重检查transformation是否已被处理可以确保在递归调用的情况下不会被重复处理
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}
// 设置network的buffer超时时间
if (transform.getBufferTimeout() >= 0) {
streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
} else {
streamGraph.setBufferTimeout(transform.getId(), defaultBufferTimeout);
}
// 设置uid
if (transform.getUid() != null) {
streamGraph.setTransformationUID(transform.getId(), transform.getUid());
}
// 设置UserProvidedNodeHash
if (transform.getUserProvidedNodeHash() != null) {
streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
}
// 如果自动设置uid功能被关闭,同时又没有指定UserProvidedNodeHash和uid,程序抛出异常
if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
if (transform.getUserProvidedNodeHash() == null && transform.getUid() == null) {
throw new IllegalStateException("Auto generated UIDs have been disabled " +
"but no UID or hash has been assigned to operator " + transform.getName());
}
}
// 设置transformation的最小和最佳资源要求
if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
}
return transformedIds;
}
接下来从transformSource
方法开始解析下StreamGraph的生成。
private <T> Collection<Integer> transformSource(SourceTransformation<T> source) {
// 返回slotSharingGroup。此处返回“default”
String slotSharingGroup = determineSlotSharingGroup(source.getSlotSharingGroup(), Collections.emptyList());
// StreamGraph增加数据源
streamGraph.addSource(source.getId(),
slotSharingGroup,
source.getCoLocationGroupKey(),
source.getOperatorFactory(),
null,
source.getOutputType(),
"Source: " + source.getName());
// 设置输入数据类型
if (source.getOperatorFactory() instanceof InputFormatOperatorFactory) {
streamGraph.setInputFormat(source.getId(),
((InputFormatOperatorFactory<T>) source.getOperatorFactory()).getInputFormat());
}
// 设置并行度
int parallelism = source.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
source.getParallelism() : executionConfig.getParallelism();
streamGraph.setParallelism(source.getId(), parallelism);
streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism());
return Collections.singleton(source.getId());
}
StreamGraph的addSource方法
public <IN, OUT> void addSource(Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperatorFactory<OUT> operatorFactory,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo, outTypeInfo, operatorName);
sources.add(vertexID);
}
这个方法逻辑不多,先增加operator,再把SourceTransformation设置为StreamGraph的source。
addOperator方法:
public <IN, OUT> void addOperator(
Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperatorFactory<OUT> operatorFactory,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
// 判断是否为StreamSource,如果是数据源的sourceFunction,它会被封装入StreamSource对象,此处返回true
if (operatorFactory.isStreamSource()) {
addNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorFactory, operatorName);
} else {
addNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorFactory, operatorName);
}
// 构建输入输出类型的序列化器
TypeSerializer<IN> inSerializer = inTypeInfo != null && !(inTypeInfo instanceof MissingTypeInfo) ? inTypeInfo.createSerializer(executionConfig) : null;
TypeSerializer<OUT> outSerializer = outTypeInfo != null && !(outTypeInfo instanceof MissingTypeInfo) ? outTypeInfo.createSerializer(executionConfig) : null;
// 设置序列化器
setSerializers(vertexID, inSerializer, null, outSerializer);
// 设置operatorFactory的输入和输出类型
if (operatorFactory.isOutputTypeConfigurable() && outTypeInfo != null) {
// sets the output type which must be know at StreamGraph creation time
operatorFactory.setOutputType(outTypeInfo, executionConfig);
}
if (operatorFactory.isInputTypeConfigurable()) {
operatorFactory.setInputType(inTypeInfo, executionConfig);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Vertex: {}", vertexID);
}
}
addNode方法:
protected StreamNode addNode(Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
Class<? extends AbstractInvokable> vertexClass,
StreamOperatorFactory<?> operatorFactory,
String operatorName) {
if (streamNodes.containsKey(vertexID)) {
throw new RuntimeException("Duplicate vertexID " + vertexID);
}
// 此处创建一个StreamNode,加入到streamNodes集合中
StreamNode vertex = new StreamNode(
vertexID,
slotSharingGroup,
coLocationGroup,
operatorFactory,
operatorName,
new ArrayList<OutputSelector<?>>(),
vertexClass);
streamNodes.put(vertexID, vertex);
return vertex;
}
transformSource方法的逻辑相对简单。在StreamGraph中增加了一个节点,还有指定了stream的sources。下面我们再研究下用的比较多的transformOneInputTransform
方法。
OneInputTransformation具有一个Input,指向它前一个transformation。如此可以形成一种链表结构,如下所示:
transformOneInputTransform代码如下所示:
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
// 先处理此Transformation的input transformation
Collection<Integer> inputIds = transform(transform.getInput());
// the recursive call might have already transformed this
// 前一个递归调用中可能已经将方法入口的transformation处理过了,这里加以判断,防止重复处理
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
// 下面步骤和transformSource类似,直到增加edge
String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getCoLocationGroupKey(),
transform.getOperatorFactory(),
transform.getInputType(),
transform.getOutputType(),
transform.getName());
if (transform.getStateKeySelector() != null) {
TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
}
int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
transform.getParallelism() : executionConfig.getParallelism();
streamGraph.setParallelism(transform.getId(), parallelism);
streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
// 这一步是关键,添加一个edge对象,将此oneInputTransformation转换成的vertex和input transformation转换为的vertex连接起来
for (Integer inputId: inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
}
return Collections.singleton(transform.getId());
}
继续跟踪streamGraph.addEdge
方法。addEdge
方法又调用了addEdgeInternal
方法,如下所示:
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
List<String> outputNames,
OutputTag outputTag,
ShuffleMode shuffleMode) {
// 稍后分析这些
if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
if (outputTag == null) {
outputTag = virtualSideOutputNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
} else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
if (outputNames.isEmpty()) {
// selections that happen downstream override earlier selections
outputNames = virtualSelectNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
shuffleMode = virtualPartitionNodes.get(virtualId).f2;
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
} else {
// 获取上游和下游的节点
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
// If no partitioner was specified and the parallelism of upstream and downstream
// operator matches use forward partitioning, use rebalance otherwise.
// 分区器设置,后面说明
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
partitioner = new RebalancePartitioner<Object>();
}
if (partitioner instanceof ForwardPartitioner) {
if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
throw new UnsupportedOperationException("Forward partitioning does not allow " +
"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
}
}
if (shuffleMode == null) {
shuffleMode = ShuffleMode.UNDEFINED;
}
// 创建一个新的StreamEdge
StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);
// 连接刚刚创建的edge到上下游节点
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
}
}
在方法开始我们看到了virtualSideOutputNodes
,virtualSelectNodes
和virtualPartitionNodes
的处理逻辑。这几类transformation会被处理为虚拟节点。什么是虚拟节点呢?我们发现sideOutput,select和分区操作不需要用户传入自定义的处理逻辑,即userFunction。这些类型的变换会被处理成虚拟节点。虚拟节点严格来说不是StreamNode类型,不包含物理转换逻辑。
虚拟节点的不会出现在StreamGraph的处理流中,在添加edge的时候如果上有节点为虚拟节点,会通过递归的方式寻找上游节点,直至找到一个非虚拟节点,再执行添加edge逻辑。虚拟节点通过内部的originalId属性,附着于非虚拟节点上。
还有Partitioner需要说明。如果没有指定partitioner,并且上下游的并行度相同,则使用ForwardPartitioner,直接推数据到本地下游的operator。如果上游和下游的并行度设置不相同,使用RebalancePartitioner。该Partitioner通过轮询的方式发送数据到下游通道。
不能在上下游并行度不同的时候使用ForwardPartitioner。否则程序会抛异常。
一张图总结
以如下程序为例:
val stream = env.fromElements("hello", "world")
stream.map((_, 0)).keyBy(0).countWindow(1).process(new ProcessWindowFunction[(String, Int), String, Tuple, GlobalWindow] {
override def process(key: Tuple, context: Context, elements: Iterable[(String, Int)], out: Collector[String]): Unit = {
for (elem <- elements) {
out.collect(elem._1)
}
}
}).print()
该程序的转换流程如图所示:
image.png
网友评论